package com.vaye.im.websocket;

import cn.hutool.core.map.MapUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 点对点聊天单机版
 * @author wangzhiyong
 * @date 2022年04月04日 上午8:24
 * 1） value = "/ws/{userId}"
 * onOpen(@PathParam("userId") String userId, Session session){ // ... }
 * 这种方式必须在前端在/后面拼接参数 ws://localhost:7889/productWebSocket/123 ，否则404
 *
 * 2） value = "/ws"
 * onOpen(Session session){ // ... }
 * Map<String, List<String>> requestParameterMap = session.getRequestParameterMap();
 * // 获得 ?userId=123 这样的参数
 * @author bart
 */
@Slf4j
@Component
@ServerEndpoint(value = "/im1/{token}/{userId}/{targetUserId}") //添加消息编码器
public class ProductWebSocket {

    //在线人数（单机服务器在线人数统计）
    private static final AtomicInteger onlineCount = new AtomicInteger(0);

    // 当前登录用户的id和websocket session的map
    private static ConcurrentHashMap<String, Session> userIdSessionMap = new ConcurrentHashMap<>();

    @OnOpen
    public void onOpen(@PathParam("token") String token,@PathParam("userId") String userId, Session session){
        if (StringUtils.isEmpty(token) || !"123".equals(token)) {
                log.error("用户{}身份认证失败",userId);
                throw new IllegalArgumentException("身份认证失败");
        }
        if (ObjectUtils.isEmpty(userId)) {
            log.error("websocket连接 缺少参数 id");
            throw new IllegalArgumentException("websocket连接 缺少参数 id");
        }
        log.info("websocket 新客户端连入，用户id：{},",userId);
        userIdSessionMap.put(userId, session);
        addOnlineCount();
        log.info("当前在线人数：{}人",getOnlineCount());
        // 发送消息返回当前用户
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("code", 200);
        jsonObject.put("message", "OK");
        send(null,userId, JSON.toJSONString(jsonObject));
    }

    /**
     * 服务端接收到信息后调用
     *
     * @param message
     * @param session
     */
    @OnMessage
    public void onMessage(@PathParam("token") String token,@PathParam("userId") String userId,@PathParam("targetUserId") String targetUserId, String message, Session session) {
        if (StringUtils.isEmpty(token) || !"123".equals(token)) {
            log.error("用户{}身份认证失败",userId);
            throw new IllegalArgumentException("身份认证失败");
        }
        log.info("用户发送过来的消息为：" + message);
        try {
            String msg = "消息发送失败";
            if (StringUtils.isEmpty(targetUserId)) {
                session.getBasicRemote().sendText(msg);
                return;
            }
            send(userId,targetUserId,message);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 连接关闭时调用
     */
    @OnClose
    public void onClose(@PathParam("token") String token,@PathParam("userId") String userId, Session session) {
        if (StringUtils.isEmpty(token) || !"123".equals(token)) {
            log.error("用户{}身份认证失败",userId);
            throw new IllegalArgumentException("身份认证失败");
        }
        log.info("一个客户端关闭连接,客户端userId={}",userId);
        userIdSessionMap.remove(userId);
        subOnlineCount();
        log.info("当前在线人数：{}",getOnlineCount());
    }

    /**
     * 服务端websocket出错时调用
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("websocket出现错误");
        error.printStackTrace();
    }

    public static synchronized void subOnlineCount() {
        onlineCount.decrementAndGet();
    }

    public static synchronized int getOnlineCount() {
        return onlineCount.get();
    }

    /**
     * 服务端发送信息给客户端
     * @param userId 用户ID
     * @param message 发送的消息
     */
    public void send(String fromUserId,String userId,String message) {
        log.info("#### 点对点消息，userId={}", userId);
        if (!StringUtils.isEmpty(fromUserId)) {
            message = String.format("%s:%s",fromUserId,message);
        }
        if (MapUtil.isEmpty(userIdSessionMap)) {
            log.error("当前无websocket连接");
            return;
        }
        if (StringUtils.isEmpty(userId)) {  //群发
            String finalMessage = message;
            userIdSessionMap.forEach((k, v) ->{
                try {
                    v.getBasicRemote().sendText(finalMessage);//发送string
                    log.info("推送用户【{}】消息成功，消息为：【{}】", k , finalMessage);
                } catch (IOException e) {
                    e.printStackTrace();
                    log.error("推送用户【{}】消息失败，消息为：【{}】，原因是：【{}】", userId , finalMessage, e.getMessage());
                }

            });
        }else {
            Session session = null;
            if (!userIdSessionMap.containsKey(userId)) {
                log.error("userId={}不在线", userId);
                message = String.format("%s不在线",userId);
                session = userIdSessionMap.get(fromUserId);
            } else {
                session = userIdSessionMap.get(userId);
            }
            try {
                session.getBasicRemote().sendText(message);//发送string
                log.info("推送用户【{}】消息成功，消息为：【{}】", userId , message);
            } catch (Exception e) {
                e.printStackTrace();
                log.error("推送用户【{}】消息失败，消息为：【{}】，原因是：【{}】", userId , message, e.getMessage());
            }

        }
    }

    public static synchronized void addOnlineCount() {
        onlineCount.incrementAndGet();
    }
}
